/*******************************************************************************
 * Package: com.song.main
 * Type:    TestMap
 * Date:    2024-11-21 19:00
 *
 * Copyright (c) 2024 LTD All Rights Reserved.
 *
 * You may not use this file except in compliance with the License.
 *******************************************************************************/
package com.song.visit;

import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 *
 * @author Songxianyang
 * @date 2024-11-21 19:00
 */
public class UserVisitMap1 {

    public static void main(String[] args) {
        // 1.创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("yqmm-spark/file/user_visit_action.txt");
        // 行 过滤掉无用数据  （搜索）
        JavaRDD<String> filterRdd = lines.filter(row -> {

                    String[] s = row.split("_");
                    return "null".equals(s[5]);
                }
        );
        // 找到点击 下单 和支付 映射成新的对象UserVisitDto

        filterRdd.flatMap(new FlatMapFunction<String, UserVisitDto>() {
            private static final long serialVersionUID = -7645264651887807816L;

            @Override
            public Iterator<UserVisitDto> call(String row) throws Exception {
                // 不等于-1就是点击
                String[] s = row.split("_");
                if (!StrUtil.equals("-1", s[6])) {
                    // 点击 (品类ID，1，  0，      0)
                    return Lists.newArrayList(new UserVisitDto(s[6], 1L, 0L, 0L)).iterator();
                }

                if (!StrUtil.equals("null", s[8])) {
                    // 下单 (品类ID，0，  1，      0)
                    String[] split = s[8].split(",");
                    List<UserVisitDto> list = new ArrayList<>();
                    for (String s1 : split) {
                        list.add(new UserVisitDto(s1, 0L, 1L, 0L));
                    }
                    return list.iterator();
                }


                // 支付 (品类ID，0，  0，      1)
                String[] split = s[10].split(",");
                List<UserVisitDto> list = new ArrayList<>();
                for (String s1 : split) {
                    list.add(new UserVisitDto(s1, 0L, 0L, 1L));
                }
                return list.iterator();
            }
        })
                // 转换 配对  (品类, (品类，1，  0，      0))
                .mapToPair(value -> new Tuple2<>(value.getProductId(), value))
                // 该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合
                .reduceByKey(new Function2<UserVisitDto, UserVisitDto, UserVisitDto>() {
                    private static final long serialVersionUID = -1471304838843344917L;

                    @Override
                    public UserVisitDto call(UserVisitDto userVisitDto, UserVisitDto userVisitDto2) throws Exception {
                        userVisitDto.setNumberOrders(userVisitDto.getNumberOrders()+userVisitDto2.getNumberOrders());
                        userVisitDto.setClicks(userVisitDto.getClicks()+userVisitDto2.getClicks());
                        userVisitDto.setNumberPayments(userVisitDto.getNumberPayments()+userVisitDto2.getNumberPayments());
                        return userVisitDto;
                    }
                })
                 // (品类，1，  0，      0)
                .map(value-> value._2())

                .sortBy(obj->obj,true,2)


                .take(10).forEach(System.out::println);
        sc.close();
    }
}
